/***
 * Copyright (c) 2021-2031 murenchao
 * fig is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *       http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */
package cool.taomu.software.fig.mqtt.broker

import cool.taomu.software.fig.classloader.FigClassLoaderManage
import cool.taomu.software.fig.classloader.FigClassLoaderManage.Autowired
import cool.taomu.software.fig.configure.ConfigureManage
import cool.taomu.software.fig.mqtt.broker.inter.MQTTSslHandler
import cool.taomu.software.fig.mqtt.utils.CommonUtils
import cool.taomu.software.fig.rpc.RpcServer
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.mqtt.MqttDecoder
import io.netty.handler.codec.mqtt.MqttEncoder
import io.netty.handler.timeout.IdleStateHandler
import org.slf4j.LoggerFactory

class MQTTBroker {
    val static LOG = LoggerFactory.getLogger(MQTTBroker);
    var NioEventLoopGroup selectGroup;
    var NioEventLoopGroup ioGroup;

    @Autowired(MQTTHandler)
    var ChannelInboundHandlerAdapter handler;

    val config = ConfigureManage.loadConfig;

    new() {
        var coreNumber = Runtime.getRuntime().availableProcessors();
        selectGroup = new NioEventLoopGroup(coreNumber);
        ioGroup = new NioEventLoopGroup(coreNumber * 2);
    }

    def startTcpServer() {
        var serverBootstrap = new ServerBootstrap();
        var bootstrap = serverBootstrap.group(selectGroup, ioGroup);
        var channel = bootstrap.channel(NioServerSocketChannel)
        channel.option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.TCP_NODELAY, false).option(
            ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, false);
        channel.childHandler(new ChannelInitializer<SocketChannel>() {
            override protected initChannel(SocketChannel ch) throws Exception {
                var pipeline = ch.pipeline();
                if (config.mqtt.useSsl) {
                    pipeline.addLast("ssl", MQTTSslHandler.build(ch, config));
                }
                pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0));
                pipeline.addLast("mqttEncoder", MqttEncoder.INSTANCE);
                pipeline.addLast("mqttDecoder", new MqttDecoder(Integer.MAX_VALUE));
                pipeline.addLast("nettyMqttHandler", handler);
            }
        })
        Runtime.runtime.addShutdownHook(new Thread() {
            override run() {
                selectGroup.shutdownGracefully();
                ioGroup.shutdownGracefully();
            }
        })
        LOG.info("启动MQTT代理服务 ip: {} port: {}", config.mqtt.ip, config.mqtt.port);
        bootstrap.bind(config.mqtt.ip, config.mqtt.port).sync();
    }

    def static start() {
        CommonUtils.exec([
            var fig = new FigClassLoaderManage();
            var mqtt = fig.getBean(MQTTBroker) as MQTTBroker;
            mqtt.startTcpServer();
        ]);
        CommonUtils.exec([
            RpcServer.start();
        ])
    }
    def static void main(String[] args){
       start(); 
    }
}
